Treasure Data社のOSSワークフローエンジン『Digdag』を試してみた #digdag

Treasure Data社のOSSワークフローエンジン『Digdag』を試してみた #digdag

Clock Icon2016.06.16

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Digdag が Apache License 2.0 の元でオープンソース化されましたよ! さぁ試すんだ…! 今すぐにでも! https://t.co/Uzc4a5GLCe ドキュメント:https://t.co/PF8wy5KHln

— Sadayuki Furuhashi (@frsyuki) 2016年6月15日

という訳で試してみました。注目度の高かったワークフローエンジン『Digdag』がついにOSS化されました!Githubリポジトリ及びドキュメントは以下となります。

目次

 

インストール

 

環境の準備

今回は手っ取り早く触ってみよう!という事でEC2環境(Amazon Linux)を用意しました。

$ ssh -i xxxxxxx.pem [email protected]
Last login: Wed Jun 15 15:03:28 2016 from xxx.xxx.xxx.xxx

       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2016.03-release-notes/
$ sudo yum -y update

 

Digdagのインストール実施

導入はとても簡単。ソースコードを入手し、実行権限を付与。

$ sudo curl -o /usr/local/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 18.6M  100 18.6M    0     0  1201k      0  0:00:15  0:00:15 --:--:--  690k
$ 
$ sudo chmod +x /usr/local/bin/digdag
$ 

digdag --helpを実行してみます。おや?エラーが出ますね...

$ digdag --help
Exception in thread "main" java.lang.UnsupportedClassVersionError: io/digdag/cli/Main : Unsupported major.minor version 52.0
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:803)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)
$ 

このエラーに関する記述もドキュメントに記載されていました。Java8(8u72)以上の環境が必要となるようです。

$ java -version
java version "1.7.0_101"
OpenJDK Runtime Environment (amzn-2.6.6.1.67.amzn1-x86_64 u101-b00)
OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)

既存Java環境をアンインストールし、

$ sudo yum -y remove java
$ java -version
-bash: /usr/bin/java: そのようなファイルやディレクトリはありません

RPMパッケージを入手、新しいバージョンのJavaをインストール。

$ sudo rpm -ivh jdk-8u92-linux-x64.rpm 
準備しています...              ################################# [100%]
更新中 / インストール中...
   1:jdk1.8.0_92-2000:1.8.0_92-fcs    ################################# [100%]
Unpacking JAR files...
	tools.jar...
	plugin.jar...
	javaws.jar...
	deploy.jar...
	rt.jar...
	jsse.jar...
	charsets.jar...
	localedata.jar...
$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
$ 

再度digdag --helpコマンドを実施。今度はちゃんとヘルプが表示されました!これでインストール完了です。

$ digdag --help
2016-06-15 15:35:29 +0000: Digdag v0.8.1
Usage: digdag <command> [options...]
  Local-mode commands:
    new <path>                       create a new workflow project
    r[un] <workflow.dig>             run a workflow
    c[heck]                          show workflow definitions
    sched[uler]                      run a scheduler server
    selfupdate                       update digdag to the latest version

  Server-mode commands:
    server                           start digdag server

  Client-mode commands:
    push <project-name>              create and upload a new revision
    start <project-name> <name>      start a new session attempt of a workflow
    retry <attempt-id>               retry a session
    kill <attempt-id>                kill a running session attempt
    backfill                         start sessions of a schedule for past times
    reschedule                       skip sessions of a schedule to a future time
    log <attempt-id>                 show logs of a session attempt
    workflows [project-name] [name]  show registered workflow definitions
    schedules                        show registered schedules
    sessions                         show sessions for all workflows
    sessions <project-name>          show sessions for all workflows in a project
    sessions <project-name> <name>   show sessions for a workflow
    session  <session-id>            show a single session
    attempts                         show attempts for all sessions
    attempts <session-id>            show attempts for a session
    attempt  <attempt-id>            show a single attempt
    tasks <attempt-id>               show tasks of a session attempt
    version                          show client and server version

  Options:
    -L, --log PATH                   output log messages to a file (default: -)
    -l, --log-level LEVEL            log level (error, warn, info, debug or trace)
    -X KEY=VALUE                     add a performance system config
    -c, --config PATH.properties     Configuration file (default: /home/ec2-user/.config/digdag/config)

Use `<command> --help` to see detailed usage of a command.
$ 

 

サンプルワークフローの実行

サンプルワークフローの実行についてもドキュメントでカバーされていますのでこちらも早速試してみます。

digdag initでワークフローの初期化/作成、

$ digdag init cmdag
2016-06-15 15:38:34 +0000: Digdag v0.8.1
  Creating cmdag/.gitignore
  Creating cmdag/tasks/shell_sample.sh
  Creating cmdag/tasks/repeat_hello.sh
  Creating cmdag/tasks/__init__.py
  Creating cmdag/cmdag.dig
Done. Type `cd cmdag` and then `digdag run cmdag.dig` to run the workflow. Enjoy!

作成されたフォルダに移動し、digdag runコマンドで拡張子*.digを指定して実行。幾つかログが表示された後、処理が正常終了しました。

$ cd cmdag/
$ digdag run cmdag.dig 
2016-06-15 15:40:44 +0000: Digdag v0.8.1
2016-06-15 15:40:46 +0000 [WARN] (main): Using a new session time 2016-06-15T00:00:00+00:00.
2016-06-15 15:40:46 +0000 [INFO] (main): Using session .digdag/status/20160615T000000+0000.
2016-06-15 15:40:46 +0000 [INFO] (main): Starting a new session project id=1 workflow name=cmdag session_time=2016-06-15T00:00:00+00:00
2016-06-15 15:40:47 +0000 [INFO] (0017@+cmdag+step1): sh>: tasks/shell_sample.sh
Step1 of session 2016-06-15T00:00:00+00:00
2016-06-15 15:40:47 +0000 [INFO] (0017@+cmdag+step2+worker1): sh>: tasks/repeat_hello.sh
Hello, world! from process 3334
2016-06-15 15:40:47 +0000 [INFO] (0019@+cmdag+step2+worker2): sh>: tasks/repeat_hello.sh
Hello, world! from process 3336
Hello, world! from process 3334
Hello, world! from process 3336
Hello, world! from process 3334
Hello, world! from process 3336
Hello, world! from process 3334
Hello, world! from process 3336
2016-06-15 15:40:51 +0000 [INFO] (0019@+cmdag+step3): py>: tasks.MyWorkflow.step3
Step3 of session 2016-06-15T00:00:00+00:00
Success. Task state is saved at .digdag/status/20160615T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
$

実行結果も気になりますが、まずは実行時に指定したファイルを見てみましょう。非常にシンプルな設定内容で何やらジョブ等が設定されています。step2の部分は_parallel: trueと指定があるので並行処理をさせているというのも把握出来ますね。

$ cat cmdag.dig 
timezone: UTC

_export:
  hello: "Hello, world!"

+step1:
  sh>: tasks/shell_sample.sh

+step2:
  _parallel: true

  +worker1:
    sh>: tasks/repeat_hello.sh

  +worker2:
    sh>: tasks/repeat_hello.sh

+step3:
  # defined at tasks/__init__.py
  py>: tasks.MyWorkflow.step3

$ 

設定ファイルで指定されていた実行ファイルの内容も確認して見ます。*.digファイルで定義した変数(hello)がこちらのファイルで受け渡されて出力されている様ですね。

$ cd tasks/
$ ll
合計 16
-rw-rw-r-- 1 ec2-user ec2-user 159  6月 15 15:38 __init__.py
-rw-rw-r-- 1 ec2-user ec2-user 578  6月 15 15:40 __init__.pyc
-rwxrw-r-- 1 ec2-user ec2-user 167  6月 15 15:38 repeat_hello.sh
-rwxrw-r-- 1 ec2-user ec2-user  49  6月 15 15:38 shell_sample.sh
$ cat shell_sample.sh 
#!/bin/sh

echo "Step1 of session $session_time"
$ cat repeat_hello.sh 
#!/bin/sh

echo "$hello from process $$"
sleep 1

echo "$hello from process $$"
sleep 1

echo "$hello from process $$"
sleep 1

echo "$hello from process $$"
sleep 1

$ 

タスクのステータスに関する情報が格納されているディレクトリに移動して中身を見てみます。非常にシンプルなタスクだったのか、詳細な内容までは出力されていませんが、処理それぞれの内容(*.yml)にstate: "success"という情報が出力されているのが確認出来ます。

$ pwd
/home/ec2-user/cmdag/.digdag/status/20160615T000000+0000
$ ll
合計 24
-rw-rw-r-- 1 ec2-user ec2-user 148  6月 15 15:40 +cmdag+step1.yml
-rw-rw-r-- 1 ec2-user ec2-user 156  6月 15 15:40 +cmdag+step2+worker1.yml
-rw-rw-r-- 1 ec2-user ec2-user 156  6月 15 15:40 +cmdag+step2+worker2.yml
-rw-rw-r-- 1 ec2-user ec2-user 148  6月 15 15:40 +cmdag+step2.yml
-rw-rw-r-- 1 ec2-user ec2-user 148  6月 15 15:40 +cmdag+step3.yml
-rw-rw-r-- 1 ec2-user ec2-user 142  6月 15 15:40 +cmdag.yml
$ cat +cmdag.yml 
fullName: "+cmdag"
state: "success"
result:
  subtaskConfig: {}
  exportParams: {}
  storeParams: {}
  report:
    inputs: []
    outputs: []
$ cat +cmdag+step1.yml 
fullName: "+cmdag+step1"
state: "success"
result:
  subtaskConfig: {}
  exportParams: {}
  storeParams: {}
  report:
    inputs: []
    outputs: []
$ cat +cmdag+step2+worker1.yml 
fullName: "+cmdag+step2+worker1"
state: "success"
result:
  subtaskConfig: {}
  exportParams: {}
  storeParams: {}
  report:
    inputs: []
    outputs: []
$ 

 

その他ドキュメントの内容について

『Getting Started』に関する部分は上記の内容となります。公式ドキュメントにはその他以下の様なセクションで内容が構成されています。読み応えがありますがとても分かり易く解説されている印象です。

Digdagの生みの親:古橋さんの関連ツイートが特徴を端的に紹介されているので、併せて引用させて頂きます。

Digdag、まずはEmbulkによるETL処理の自動化に最適。複数のデータソースから並列または直列にデータロード→日付ごとにテーブル作成→一次集計とJOIN…という処理を直感的に記述できる。 #digdag pic.twitter.com/IDEV2eVbjZ

— Sadayuki Furuhashi (@frsyuki) 2016年6月15日

パラメータ化が可能。引数やファイルから受け取ったパラメータを、設定ファイルや引数に埋め込んでからコマンドを実行できる。同じような処理を同じテーブルや同じデータソースに対して適用したい場合に効果的。 #digdag

— Sadayuki Furuhashi (@frsyuki) 2016年6月15日

for_each> や if> などのフロー制御ができ、プログラマブルなワークフローを組み立てられる。外部のスクリプトを呼んでパラメータをとってきて、その値に応じて次のアクションを変えたり、ループさせたりできる。https://t.co/Y7v21U3Sek #digdag

— Sadayuki Furuhashi (@frsyuki) 2016年6月15日

Dockerイメージを指定してタスクを実行できる、スケジューラ内蔵なのでcronがいらない、分散実行に対応、実行時間超過/失敗時のエラー通知、etc etc。今ワークフローエンジンをちゃんと作るとこうなるよね、という機能がほぼあるはず。 #digdag

— Sadayuki Furuhashi (@frsyuki) 2016年6月15日

正直なところ、コードはEmbulkより良くできているので、メンテナンスが楽そう…チームメンバーも2人いるし。 #digdag

— Sadayuki Furuhashi (@frsyuki) 2016年6月15日

 

参考資料

更にDigdagを理解する上での助けとなる情報は以下の通り。過去イベントで発表されたスライド資料及び動画、また、古橋さんの事前インタビュー記事も必見です。

スライド資料: 分散ワークフローエンジン『DigDag』の実装 at Tokyo RubyKaigi #11

YouTube動画: 【基調講演】分散ワークフローエンジン『Digdag』の実装

関連インタビュー記事

 

まとめ

以上、新しくリリースされたTreasure Data社によるOSSプロジェクト『Digdag』に関する『やってみた』エントリでした。上記で試した内容はまだほんの触りの部分ですので、これから折を見てドキュメントを読みながらDigdagによるワークフロー構築を実践して行きたいと思います!こちらからは以上です。

その他参考情報:

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.